package com.example.dobs.demo.flink.io.tool;






import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;

public class HDFSReader {
    /**
     * URL方式默认是不识别hdfs的
     * file:///Users/msxr/develop/var/tmp/employee.txt
     * @throws IOException IOException
     */
    public static void javaUrlReader( String path) throws IOException {
        if (path == null)
            path = "file:///Users/msxr/develop/var/tmp/employee.txt";
        InputStream in = null;
        try {
            in = new URL(path).openStream();
            IOUtils.copy(in,System.out);
        } finally {
            IOUtils.closeQuietly(in);
        }
    }

    /**
     * 每个JVM只能调用一次java.net.URL.setURLStreamHandlerFactory()
     * 通过FsUrlStreamHandlerFactory调用实现对hdfs协议的支持
     * @param path path
     * @throws IOException IOException
     */
    public static void urlCat(String path) throws IOException {
        if (path == null)
            path="hdfs://localhost:9000/input/aaa.txt";
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        InputStream in = null;
        try {
            in = new URL(path).openStream();
            IOUtils.copyLarge(in, System.out, new byte[4096]);
        } finally {
            IOUtils.closeQuietly(in);
        }
    }
    public static void fsOpen(String path) throws IOException {
        if (path == null)
            path="hdfs://localhost:9000/input/aaa.txt";
        Path fsPath = new Path(path);
        FileSystem fs = fsPath.getFileSystem(new Configuration());
        InputStream in = null;
        try {

            in = fs.open(fsPath);
            IOUtils.copyLarge(in, System.out, new byte[4096]);
        } finally {
            IOUtils.closeQuietly(in);
        }
    }
    public static void flinkFsOpen(String path) throws IOException {
        if (path == null)
            path="hdfs://localhost:9000/input/aaa.txt";
        org.apache.flink.core.fs.Path fsPath = new org.apache.flink.core.fs.Path(path);
        org.apache.flink.core.fs.FileSystem fs = org.apache.flink.core.fs.FileSystem.get(fsPath.toUri());
        InputStream in = null;
        try {

            in = fs.open(fsPath);
            IOUtils.copyLarge(in, System.out, new byte[4096]);
        } finally {
            IOUtils.closeQuietly(in);
        }

    }

    public static void main(String[] args) throws IOException {
        String path="hdfs://localhost:9000/input/aaa.txt";
        path ="file:///Users/msxr/develop/var/tmp/employee.txt";
        flinkFsOpen(path);
    }
}
